
package com.shujia.spark.streaming

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

object Demo5ReadKafka {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setAppName("streaming")
      .setMaster("local[2]")


    /**
      * 创建streaming 上下文对象，指定batch的间隔时间,多久计算一次
      *
      */
    val ssc = new StreamingContext(conf, Durations.seconds(5))


    ssc.checkpoint("data/checkpoint")


    //读取kafka数据


    //kafka链接参数
    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "master:9092,node1:9092,node2:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "asdasdasdas",
      "auto.offset.reset" -> "latest", //latest：读取新的数据
      "enable.auto.commit" -> "false"
    )


    //topic 列表
    val topics = Array("test_topic1")

    /**
      * createDirectStream: 主动拉取数据
      *
      */

    val linesDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    /**
      * kafka 是一个key value 格式的， 默认key 为null ,一般用不上
      *
      */
    linesDS
      .map(record => (record.key(), record.value()))
      .map(_._2)
      .flatMap(_.split(","))
      .map((_, 1))
      .updateStateByKey((seq: Seq[Int], opt: Option[Int]) => Some(seq.sum + opt.getOrElse(0)))
      .print()


    ssc.start()
    ssc.awaitTermination()
    ssc.stop()


  }

}

